Spark SQL

适用于版本2.3.0

1 概览

(1) SQL

查询结果以Dataset/DataFrame形式返回。

可以通过命令行command-lineJDBC/ODBC连接

(2) Dataset和DataFrame

1) Dataset

DataSet是分布式数据集合。

可从JVM对象生成。

Python和R暂时只能间接实现类似功能。

2) DataFrame

是使用命名的列组织的Dataset.

等同于关系型数据库中的表和Pathon/R中的data frame.

Scala中表示为Dataset[Row]的别名DataFrame,Java中表示为Dataset\

2 入门

(1) Spark Session

SparkSession是Spark SQL的入口。

在版本2.0中提供了Hive支持。如使用HiveQL查询、访问Hive UDF和从Hive表中读取数据。

1
2
3
4
5
6
7
8
9
10
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder()
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

(2) 创建DataFrame

从JSON数据中创建

1
2
3
4
5
6
7
8
9
10
11
val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(3) DataFrame操作

即无类型数据集操作

示例如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+

(4) 编码SQL查询

sql语句作为参数传递。示例如下

1
2
3
4
5
6
7
8
9
10
11
12
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(5) 全局临时视图

通常,临时视图的生命周期仅限于当前会话。

全局临时视图将视图绑定到系统保存的数据库global_temp。使用时必须使用限定的名称引用。

全局临时视图可以在会话间共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(6) 创建Dataset

Dataset适用于RDD不同的序列化方式Encoder,能够直接过滤、排序或哈希,而不需要反序列化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+

(7) RDD间操作

RDD转换为DataFrame

  • 反射推断

    适用于运行前已知模式

    使用样例类

  • 编程指定

    适用于运行前未知模式

    使用StructureType

1) 使用反射推断模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
.textFile("examples/src/main/resources/people.txt")
.map(_.split(","))
.map(attributes => Person(attributes(0), attributes(1).trim.toInt))
.toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

2) 指定模式

构建模式。

  • 创建模式
  • 将记录转换为行RDD
  • 应用模式到行RDD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// 构建模式
// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
// 记录转换为行
// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
.map(_.split(","))
.map(attributes => Row(attributes(0), attributes(1).trim))

// 应用模式
// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// | value|
// +-------------+
// |Name: Michael|
// | Name: Andy|
// | Name: Justin|
// +-------------+

(8) 聚合

1) 无类型用户自定义聚合函数

通过实现抽象类UserDefinedAggregateFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._

// 实现UserDefinedAggregateFunction
object MyAverage extends UserDefinedAggregateFunction {
// Nil表示空集合,::表示前者(元素)追加到后者(集合)
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// 使用输入数据更新
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}

// Register the function to access it
spark.udf.register("myAverage", MyAverage)

val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

2) 类型安全用户自定义聚合函数

用于与强类型数据集交互

通过实现 Aggregator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)

// 实现Aggregator
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}

val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// +-------+------+
// | name|salary|
// +-------+------+
// |Michael| 3000|
// | Andy| 4500|
// | Justin| 3500|
// | Berta| 4000|
// +-------+------+

// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()
// +--------------+
// |average_salary|
// +--------------+
// | 3750.0|
// +--------------+

3 数据源

将DataFrame注册为临时视图,可以在其上进行SQL查询。

(1) 通用加载/保存函数

默认使用parquet作为数据源,可通过spark.sql.sources.default修改。

1
2
val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

1) 人工指定选项

数据源通过全限定名指定(如org.apache.spark.sql.parquet),内建的数据源可以使用短名称(如json, parquet, jdbc, orc, libsvm, csv, text)

1
2
3
4
5
6
7
8
9
10
// JSON
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

// CSV
val peopleDFCsv = spark.read.format("csv")
.option("sep", ";")
.option("inferSchema", "true")
.option("header", "true")
.load("examples/src/main/resources/people.csv")

2) 直接在文件上执行SQL

不用加载到DataFrame,就可以执行SQL查询

1
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

3) 保存模式

指定数据存在时的行为。

以下行为不是原子的,也没有使用锁。

覆盖时,先删除再写入。

image-20200706112327573

4) 保存到持久化表中

持久化表在Spark程序重启后依旧可用,不同于临时视图。

DataFrame可以使用saveAsTable方法持久化到Hive Metastore中。

通过SparkSession的table方法按名称调用持久化表。

不需要单独部署Hive。Spark将创建使用Derby创建本地Hive Metastore。

基于文件的数据源,可以指定路径。如df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表后,路径和文件依旧存在。

没有指定路径时,Spark将数据写入到仓库目录的默认表路径。删除表后,默认表路径也删除。

版本>=2.1,持久化表具有存储在Hive Metastore中的分区元数据。可以:

  • 不需要在首次查询时扫描所有分区,因为Metastore可以只返回需要的分区。
  • 可以使用Hive DDL

注意:创建外部数据源表(使用path选项)时,默认不聚集分区信息。可以使用MSCK REPAIR TABLE同步。

5) 分组(Bucket)、排序和分区

分组和排序只能用于持久化表。

分组将数据分布在固定数量的桶中。

分区对唯一值数量敏感,即对具有高基数的列的适用性有限。

可以同时使用分组和分区。

Spark 3.0

partittionBy创建了分区发现一节中描述的目录结构,因此限制了高基数的列的可用性。

bucketBy在固定数量桶中分散数据,可以用于唯一值数量没有边界的的场景

注意:bucketBy使用了列的哈希值分桶,保证具有相同哈希值的记录在同一个桶中,可以避免shuffle。

1
2
3
4
5
6
7
8
9
10
peopleDF.write.bucketBy(42, "name").sortBy("age").saveAsTable("people_bucketed")

usersDF.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

// 同时使用分组和分区
usersDF
.write
.partitionBy("favorite_color")
.bucketBy(42, "name")
.saveAsTable("users_partitioned_bucketed")

(2) Parquet文件

Parquet是列格式的数据。自描述格式,保存有模式信息。

当写入Parquet文件文件时,为了适配,将列自动转换为可为空。

1) 数据加载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// Encoders for most common types are automatically provided by importing spark.implicits._
import spark.implicits._

val peopleDF = spark.read.json("examples/src/main/resources/people.json")

// DataFrames can be saved as Parquet files, maintaining the schema information
peopleDF.write.parquet("people.parquet")

// Read in the parquet file created above
// Parquet files are self-describing so the schema is preserved
// The result of loading a Parquet file is also a DataFrame
val parquetFileDF = spark.read.parquet("people.parquet")

// Parquet files can also be used to create a temporary view and then used in SQL statements
parquetFileDF.createOrReplaceTempView("parquetFile")
val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")
namesDF.map(attributes => "Name: " + attributes(0)).show()
// +------------+
// | value|
// +------------+
// |Name: Justin|
// +------------+

2)分区发现

Hive之类的系统使用表分区作为一种通用优化手段。

分区表使用分区列将数据分散到不同的目录中。

内建文件数据源(Text/CSV/JSON/ORC/Parquet)支持自动发现和推断分区信息。

如使用gender和county作为分区列,分区表目录如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...

├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...

传递参数path/to/table到SparkSession.read.parquet或SparkSession.read.load,可以自动从路径中提取分区和模式信息。提取的模式信息如下:

1
2
3
4
5
root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

分区列数据类型当前支持数值、日期、时间戳和字符串类型。

通过spark.sql.sources.partitionColumnTypeInference.enabled关闭自动推断。关闭后,将使用字符串作为分区列类型。

版本>=1.6,默认只发现传递的目录参数中的分区。子目录也不能发现。可以通过数据源的basePath选项更改。

3) 模式合并

自动检测并合并兼容的Parquet数据源。

版本>=1.5,默认关闭。

开启方法:

1 读取时,设置数据源选项mergeSchema为true

2 设置全局SQL选项spark.sql.parquet.mergeSchema为true

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// This is used to implicitly convert an RDD to a DataFrame.
import spark.implicits._

// Create a simple DataFrame, store into a partition directory
val squaresDF = spark.sparkContext.makeRDD(1 to 5).map(i => (i, i * i)).toDF("value", "square")
squaresDF.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val cubesDF = spark.sparkContext.makeRDD(6 to 10).map(i => (i, i * i * i)).toDF("value", "cube")
cubesDF.write.parquet("data/test_table/key=2")

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths
// root
// |-- value: int (nullable = true)
// |-- square: int (nullable = true)
// |-- cube: int (nullable = true)
// |-- key: int (nullable = true)

4) Hive元数据存储Parquet表转换

为了性能,Spark使用自身的Parquet支持替代Hive的SerDe。通过spark.sql.hive.convertMetastoreParquet开关。

1’ Hive/Parquet模式调和(reconciliation)

Hive与Parquet模式区别:

  • Hive大小写敏感,而Parquet不是。
  • Hive所有列是可为空的,而Parquet不是。

调和规则:

  • 同名列必须数据类型相同,并且是否为空和Parquet相同。
  • 字段与Hive Metastore保持一致
    • 仅在Parquet中出现的字段被删除
    • 仅在Hive中出现的字段被添加
2’ 元数据刷新

为了性能,Spark SQL缓存元数据。需要人工刷新:

1
2
// spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

5) 配置

  • 使用SparkSession的setConf
  • 在SQL脚本中,使用SET key=value

image-20200708172238345

(3) ORC文件

版本>=2.3,支持一种矢量化的ORC读取器,用于读取ORC文件。

  • 本地ORC表(如使用USING ORC创建的)

    spark.sql.orc.impl->native

    spark.sql.orc.enableVectorizedReader->true

  • Hive(如使用USING HIVE OPTIONS创建的)

    spark.sql.hive.convertMetastoreOrc->true

image-20200708173123271

(4) JSON数据集

Spark使用的JSON格式与典型格式有所区别,详见JSON Lines text format, also called newline-delimited JSON.

其采用UTF-8编码,要求每一行采用\n分隔,并且每一行都是合法的JSON值。

使用多行JSON,需要设置multiLine为true.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// Primitive types (Int, String, etc) and Product types (case classes) encoders are supported by importing this when creating a Dataset.
import spark.implicits._

// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files
val path = "examples/src/main/resources/people.json"
val peopleDF = spark.read.json(path)

// The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by spark
val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
// +------+
// | name|
// +------+
// |Justin|
// +------+

// Alternatively, a DataFrame can be created for a JSON dataset represented by
// a Dataset[String] storing one JSON object per string
val otherPeopleDataset = spark.createDataset(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val otherPeople = spark.read.json(otherPeopleDataset)
otherPeople.show()
// +---------------+----+
// | address|name|
// +---------------+----+
// |[Columbus,Ohio]| Yin|
// +---------------+----+

(5) Hive表

Hive依赖库需要在所有节点可用,如使用序列化与反序列化库

Hive配置:将hive-site.xml(用于连接外部Hive)、core-site.xml(用于安全配置)和hdfs-site.xml(用于HDFS访问)放置到conf目录中。

初始化SparkSession时需要开启Hive支持。其中包括连接到持久化Hive Metastore、Hive Serdes和用户自定义函数。

没有hive-site.xml配置时,Spark在spark.sql.warehouse.dir(默认是程序运行目录下的spark-warehouse文件夹)创建metastore_db。

版本>=2.0, Spark使用spark.sql.warehouse.dir替换Hive配置中的hive.metastore.warehouse.dir。

启动程序的用户需要授予写权限。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
import java.io.File

import org.apache.spark.sql.{Row, SaveMode, SparkSession}

case class Record(key: Int, value: String)

// warehouseLocation points to the default location for managed databases and tables
val warehouseLocation = new File("spark-warehouse").getAbsolutePath

val spark = SparkSession
.builder()
.appName("Spark Hive Example")
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport() // 开启Hive支持
.getOrCreate()

import spark.implicits._
import spark.sql

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL
sql("SELECT * FROM src").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Aggregation queries are also supported.
sql("SELECT COUNT(*) FROM src").show()
// +--------+
// |count(1)|
// +--------+
// | 500 |
// +--------+

// The results of SQL queries are themselves DataFrames and support all normal functions.
val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

// The items in DataFrames are of type Row, which allows you to access each column by ordinal.
val stringsDS = sqlDF.map {
case Row(key: Int, value: String) => s"Key: $key, Value: $value"
}
stringsDS.show()
// +--------------------+
// | value|
// +--------------------+
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// |Key: 0, Value: val_0|
// ...

// You can also use DataFrames to create temporary views within a SparkSession.
val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
recordsDF.createOrReplaceTempView("records")

// Queries can then join DataFrame data with data stored in Hive.
sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
// +---+------+---+------+
// |key| value|key| value|
// +---+------+---+------+
// | 2| val_2| 2| val_2|
// | 4| val_4| 4| val_4|
// | 5| val_5| 5| val_5|
// ...

// Create a Hive managed Parquet table, with HQL syntax instead of the Spark SQL native syntax
// `USING hive`
sql("CREATE TABLE hive_records(key int, value string) STORED AS PARQUET")
// Save DataFrame to the Hive managed table
val df = spark.table("src")
df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")
// After insertion, the Hive managed table has data now
sql("SELECT * FROM hive_records").show()
// +---+-------+
// |key| value|
// +---+-------+
// |238|val_238|
// | 86| val_86|
// |311|val_311|
// ...

// Prepare a Parquet data directory
val dataDir = "/tmp/parquet_data"
spark.range(10).write.parquet(dataDir)
// Create a Hive external Parquet table
sql(s"CREATE EXTERNAL TABLE hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")
// The Hive external table should already have data
sql("SELECT * FROM hive_ints").show()
// +---+
// |key|
// +---+
// | 0|
// | 1|
// | 2|
// ...

// Turn on flag for Hive Dynamic Partitioning
spark.sqlContext.setConf("hive.exec.dynamic.partition", "true")
spark.sqlContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
// Create a Hive partitioned table using DataFrame API
df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")
// Partitioned column `key` will be moved to the end of the schema.
sql("SELECT * FROM hive_part_tbl").show()
// +-------+---+
// | value|key|
// +-------+---+
// |val_238|238|
// | val_86| 86|
// |val_311|311|
// ...

spark.stop()

注意:

  • Hive版本>=0.9默认开启自动分区。严格模式需要指定静态分区列,而非严格模式不需要。默认有相应的分区数量限制。
  • Hive 3.0不需要指定动态分区列,因为会自动创建。

Dynamic Partition Inserts

1) 指定存储格式

创建Hive表时,需要指定存储格式和序列化方式,如 CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')

读取默认按照文本格式。

当前不直接支持Hive storage handler。可以先在Hive端创建表,Spark SQL再读取

image-20200709112824948

2) 与不同版本Hive Metastore交互

Spark可以通过配置与不同的Hive Metastore交互。

在内部,Spark基于Hive1.2.1编译,使用其内部执行,如serdes、UDF和UDAF

image-20200709114550299

注意:Spark 3.0提供了对Hive3.0的支持。

(6) JDBC连接其他数据库

Spark可以使用JDBC连接数据库。尤其是使用JdbcRDD时。

不同于Spark SQL JDBC Server(用于向外提供Spark SQL查询能力),不用提供ClassTag。

需要将相应的驱动放置在类路径下。

如:

1
bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

连接时,需要提供用户名和密码。

根据需要进行以下配置: 大小写敏感

image-20200709115759302

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
// 方法一:使用option
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()

// 方法二:通过jdbc
val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Saving data to a JDBC source
jdbcDF.write
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.save()

jdbcDF2.write
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

// Specifying create table column data types on write
jdbcDF.write
.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

(7) 故障排除

  • 在所有节点的compute_classpath.sh包含驱动程序

    因为Java的DriveManager在安全检查时会忽略对原始类加载器不可见的驱动。

  • 一些数据库将所有名称转换为大写,Spark SQL需要使用大写名称引用。

4 性能优化

(1) 内存缓存

使用spark.catalog.cacheTable(“tableName”)或dataFrame.cache()缓存表。

spark.catalog.uncacheTable(“tableName”)释放缓存。

Spark只扫描需要的列并自动压缩,以减少内存占用和GC压力。

可以使用setConf或SQL的SET key=value配置。

image-20200710112439772

(2) 其他配置选项

以下优化选项可能在后续版本中移除:

image-20200710112544835

(3) SQL查询的广播提示

Spark广播可用于表或视图间的join操作。

即使广播数据量超过spark.sql.autoBroadcastJoinThreshold配置,Spark还是会使用broadcast hash join(BHJ)。

当两张表都广播时,Spark会广播统计只较小的一个。

对于不支持BHJ的情形(如full outer join),不保证使用BHJ。

嵌套循环的广播依然遵守该规则。

1
2
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()

5 分布式SQL引擎

可以通过JDBC、ODBC或命令行,使用Spark SQL作为分布式执行引擎,无需编写其他代码。

(1) 运行Thrift JDBC/ODBC服务器

Thrift JDBC/ODBC对应于Hive 1.2.1中的HiveServer2。可以使用Spark或Hive中的beeline脚本测试。

在Spark目录中启动服务器,接收所有bin/spark-submit的参数和配置Hive的–hiveconf

1
./sbin/start-thriftserver.sh

默认监听localhost:10000,可通过以下方式修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 方法一:环境变量
export HIVE_SERVER2_THRIFT_PORT=<listening-port>
export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
./sbin/start-thriftserver.sh \
--master <master-uri> \
...

// 方法二:系统属性
./sbin/start-thriftserver.sh \
--hiveconf hive.server2.thrift.port=<listening-port> \
--hiveconf hive.server2.thrift.bind.host=<listening-host> \
--master <master-uri>
...

使用beeline测试:

1
2
3
4
5
// 启用beeline
./bin/beeline

// 连接服务器
beeline> !connect jdbc:hive2://localhost:10000

需要输入用户名和密码:

服务器支持使用HTTP发送thrift RPC 消息,可通过系统属性或hive-site.xml配置。

1
2
3
4
5
6
hive.server2.transport.mode - Set this to value: http
hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
hive.server2.http.endpoint - HTTP endpoint; default is cliservice

// 使用beeline连接
beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

(2) 运行Spark SQL CLI

用于在本地模式运行Hive Metastore服务。

不能与Thrift JDBC 服务器交互。

1
2
// 启用CLI
./bin/spark-sql

6 使用Apache Arrow、用于Pandas的PySpark指南

7 迁移指南

(1) 版本迁移

(2) Apache Hive兼容性

版本2.3.0基于Hive 1.2.1支持。

可以通过配置,支持版本0.12.0-2.1.1

1) 在已有Hive仓库中部署

不用更改Hive配置

2) 支持的Hive特性

image-20200710115708664

3) 不支持的Hive特性

image-20200710115929340

8 参考

(1) 数据类型

image-20200710120047554

image-20200710120107513

(2) NaN语义

表示not a number,用于浮点型或双精度类型。

  • NaN = NaN,结果为true
  • 聚合时,所有NaN分为一组
  • 连接时,被当做普通值处理
  • 升序排列时,排在所有数值之后。

参考资料

高基数:集合的基数,是其元素个数概念的推广。

Spark SQL, DataFrames and Datasets Guide

scala中:: , +:, :+, :::, +++的区别

scala中Nil用法

高基数是什么意思